#include "config.h"

#include <dirent.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBLSV

#include "fileinfo.h"
#include "md_map.h"
#include "net_global.h"
#include "stor_rpc.h"
#include "vnode.h"
#include "volume_ctl.h"
#include "volume_proto.h"
#include "yid.h"
#include "ypage.h"
#include "schedule.h"

#include "lba_lock.h"
#include "lsv_bitmap.h"
#include "lsv_volume.h"
#include "lsv_bitmap_internal.h"
#include "lsv_lib.h"
#include "lsv_rcache.h"
#include "lsv_volume_proto.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"
#include "row2_bitmap.h"
#include "row3_volume_proto_io.h"
#include "row_low_tlb_io.h"

#include <buffer.h>
#include <stack.h>
#include "types.h"

#define USE_SPAN_LOCK 1
#define LOCK_GRANULARITY (2 * CHUNK_SIZE)

#define ROW3_LOCK_OPTIMIZE       0

typedef struct row3_extents{
        uint32_t vvol_id;
        uint32_t chunk_id;
        uint32_t chunk_off;
        uint32_t length;
        struct row3_extents *next;
} row3_extents_t;

#define ROW3_LOCK_LEVEL_NO_LOCK              0
#define ROW3_LOCK_LEVEL_SPAN_RLOCK           1
#define ROW3_LOCK_LEVEL_SPAN_WLOCK           2
#define ROW3_LOCK_LEVEL_BITMAP_COW_LOCK      3

typedef struct
{
        int lock_level;
        int lock_set;
        void *user_data;
}row3_lock_switch_t;

int row3_span_rdlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        int ret;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        void * trans = ltable_start_transaction(&lsv_info->lock_table);
        if(!trans)
                return ENOMEM;

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));
                assert(clen);

                ret = ltable_rdlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));
                if(ret) {
                        ltable_revert_transaction(&lsv_info->lock_table, trans);
                        return ret;
                }

                ltable_transaction_go(&lsv_info->lock_table, trans, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }

        ltable_stop_transaction(&lsv_info->lock_table, trans);

        return 0;
}

int row3_span_wrlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        int ret;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        void * trans = ltable_start_transaction(&lsv_info->lock_table);
        if(!trans)
                return ENOMEM;

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));
                assert(clen);

                ret = ltable_wrlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));
                if(ret) {
                        ltable_revert_transaction(&lsv_info->lock_table, trans);
                        return ret;
                }

                ltable_transaction_go(&lsv_info->lock_table, trans, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }

        ltable_stop_transaction(&lsv_info->lock_table, trans);

        return 0;
}

void row3_span_unlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));
                assert(clen);

                ltable_unlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

#define COW_LOCK_GRANULARITY (512 * CHUNK_SIZE)

int row3_bitmap_span_rdlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif
        int ret;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        void * trans = ltable_start_transaction(&lsv_info->bitmap_cow_lt);
        if(!trans)
                return ENOMEM;

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));
                assert(clen);

                ret = ltable_rdlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));
                if(ret) {
                        ltable_revert_transaction(&lsv_info->bitmap_cow_lt, trans);
                        return ret;
                }
                
                ltable_transaction_go(&lsv_info->bitmap_cow_lt, trans, round_down(off, COW_LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }

        ltable_stop_transaction(&lsv_info->bitmap_cow_lt, trans);

        return 0;
}

static int row3_bitmap_span_wrlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif
        int ret;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        void * trans = ltable_start_transaction(&lsv_info->bitmap_cow_lt);
        if(!trans)
                return ENOMEM;

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));
                assert(clen);

                ret = ltable_wrlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));
                if(ret) {
                        ltable_revert_transaction(&lsv_info->bitmap_cow_lt, trans);
                        return ret;
                }

                ltable_transaction_go(&lsv_info->bitmap_cow_lt, trans, round_down(off, COW_LOCK_GRANULARITY));

                assert(!ret);

                len -= clen;
                off += clen;
        }

        ltable_stop_transaction(&lsv_info->bitmap_cow_lt, trans);

        return 0;
}

void row3_bitmap_span_unlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));
                assert(clen);

                ltable_unlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

void row3_bitmap_cow_callback(uint8_t *dataptr)
{
        lsv_bitmap_unit_t *bitmap = (lsv_bitmap_unit_t *)dataptr;

        DBUG("row3_bitmap_cow_callback enter\r\n");

        for (int i = 0; i < CHUNK_SIZE / sizeof(lsv_bitmap_unit_t); i++) {
                bitmap->ref = 1;

                bitmap++;
        }
}

int row3_bitmap_span_wrlock1(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        int ret;
        uint64_t new_off;
        uint32_t new_len;

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        void * trans = ltable_start_transaction(&lsv_info->lock_table);
        if(!trans)
                return ENOMEM;

        range_align(COW_LOCK_GRANULARITY, off, len, &new_off, &new_len);

        while (new_len) {
                ret = ltable_wrlock(&lsv_info->lock_table, new_off / LOCK_GRANULARITY);
                if(unlikely(ret)) {
                        ltable_revert_transaction(&lsv_info->lock_table, trans);
                        return ret;
                }

                ltable_transaction_go(&lsv_info->lock_table, trans, new_off / LOCK_GRANULARITY);

                new_len -= LOCK_GRANULARITY;
                new_off += LOCK_GRANULARITY;
        }

        ltable_stop_transaction(&lsv_info->lock_table, trans);

        return 0;
}

void row3_bitmap_span_unlock1(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        uint64_t new_off;
        uint32_t new_len;

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        range_align(COW_LOCK_GRANULARITY, off, len, &new_off, &new_len);

        while (new_len) {
                ltable_unlock(&lsv_info->lock_table, new_off / LOCK_GRANULARITY);

                new_len -= LOCK_GRANULARITY;
                new_off += LOCK_GRANULARITY;
        }
}

/*
static inline int row3_pages_to_extents(uint64_t *pages, uint32_t count, row3_extents_t *extents, uint32_t *ext_count)
{
        int ext_idx = 0;
        row3_extents_t *extent = extents;

        extent[ext_idx].offset = pages[0] * LSV_PAGE_SIZE;
        extent[ext_idx].length = LSV_PAGE_SIZE;

        for (int i = 1; i < count; i++) {
                if (pages[i] * LSV_PAGE_SIZE == extent[ext_idx].offset + extent[ext_idx].length) {
                        extent[ext_idx].length += LSV_PAGE_SIZE;
                } else {
                        ext_idx++;
                        if (ext_idx >= *ext_count)
                                return -1;

                        extent[ext_idx].offset = pages[i] * LSV_PAGE_SIZE;
                        extent[ext_idx].length = LSV_PAGE_SIZE;
                }
        }

        *ext_count = ext_idx + 1;

        return 0;
}
*/

static inline int row3_bitmaps_to_extents(lsv_bitmap_unit_t *bitmaps, uint32_t count, row3_extents_t *extents, uint32_t *ext_count, int write)
{
        int ext_idx = 0;
        row3_extents_t *extent = extents;

        if(write)
                extent[0].vvol_id = 0;  //vvol id does no sense for the writting.

        if(write && bitmaps[0].ref) {
                LSV_DBUG("ref\r\n");
                bitmaps[0].ref = 0;
                bitmaps[0].chunk_id = 0;
                bitmaps[0].chunk_page_off = 0;
        }

        extent[ext_idx].vvol_id = bitmaps[0].vvol_id;
        extent[ext_idx].chunk_id = bitmaps[0].chunk_id;
        extent[ext_idx].chunk_off = (int)(bitmaps[0].chunk_page_off) * LSV_PAGE_SIZE;
        extent[ext_idx].length = LSV_PAGE_SIZE;
        extent[ext_idx].next = NULL;

        for (int i = 1; i < count; i++) {
                if(write)
                        extent[ext_idx].vvol_id = 0;

                if(write && bitmaps[i].ref) {
                        LSV_DBUG("ref\r\n");
                        bitmaps[i].ref = 0;
                        bitmaps[i].chunk_id = 0;
                        bitmaps[i].chunk_page_off = 0;
                }

                if(bitmaps[i].chunk_page_off == 255)
                        assert(bitmaps[i].chunk_page_off * LSV_PAGE_SIZE == 1044480);   //just to testing for complier flags
                        
                if (bitmaps[i].chunk_id == 0 && extents[ext_idx].chunk_id == 0) {
                        extent[ext_idx].length += LSV_PAGE_SIZE;
                } else if (bitmaps[i].chunk_id == extent[ext_idx].chunk_id && (int)(bitmaps[i].chunk_page_off) * LSV_PAGE_SIZE == extent[ext_idx].chunk_off + extent[ext_idx].length
                        && extent[ext_idx].chunk_off + LSV_PAGE_SIZE + extent[ext_idx].length <=  LSV_CHUNK_SIZE
                        && extent[ext_idx].vvol_id == bitmaps[i].vvol_id) {
                        extent[ext_idx].length += LSV_PAGE_SIZE;
                } else {
                        ext_idx++;
                        if (ext_idx >= *ext_count)
                                return -1;

                        extent[ext_idx].vvol_id = bitmaps[i].vvol_id;
                        extent[ext_idx].chunk_id = bitmaps[i].chunk_id;
                        extent[ext_idx].chunk_off = (int)(bitmaps[i].chunk_page_off) * LSV_PAGE_SIZE;
                        extent[ext_idx].length = LSV_PAGE_SIZE;
                        extent[ext_idx].next = NULL;
                }
        }

        *ext_count = ext_idx + 1;

        return 0;
}

static inline void row3_extents_to_bitmaps(row3_extents_t *extents, uint32_t ext_count, lsv_bitmap_unit_t *bitmaps, uint32_t count)
{
        int idx = 0;

        for (int i = 0; i < ext_count; i++) {
                row3_extents_t *extent = &extents[i];
                while(extent) {
                        bitmaps[idx].chunk_page_off = extent->chunk_off / LSV_PAGE_SIZE;
                        bitmaps[idx].chunk_id = extent->chunk_id;
                        bitmaps[idx].ref = 0;   //for write only.
                        idx ++;

                        extent->chunk_off += LSV_PAGE_SIZE;
                        extent->length -= LSV_PAGE_SIZE;

                        if(!extent->length)
                                extent = extent->next;
                }
        }

        assert(idx == count);
}

static inline int row3_allocate_to_extent(lsv_volume_proto_t *lsv_info, row3_extents_t *extent)
{
        int ret = 0;
        row3_extents_t *ext = extent;
        uint32_t left = extent->length;
        //uint32_t off = 0;

        while(ext) {
                if(!lsv_info->row3_tail.chunk_id || (lsv_info->row3_tail.chunk_off == LSV_CHUNK_SIZE) ) {
                        ret = lsv_volume_chunk_malloc(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, &lsv_info->row3_tail.chunk_id);
                        if (unlikely(ret)) {
                                GOTO(end, ret);
                        }
                        DINFO("allocate data chunk_id:%d\r\n",lsv_info->row3_tail.chunk_id);

                        lsv_info->row3_tail.chunk_off = 0;
                }

                uint32_t len = min_t(uint32_t, (LSV_CHUNK_SIZE-lsv_info->row3_tail.chunk_off), ext->length);
                left -= len;

                ext->chunk_id = lsv_info->row3_tail.chunk_id;
                ext->chunk_off = lsv_info->row3_tail.chunk_off;
                ext->length = len;

                lsv_info->row3_tail.chunk_off += len;

                if(left) {
                        ext->next = xmalloc(sizeof(row3_extents_t));
                        if(!ext->next)
                                return ENOMEM;

                        ext->next->chunk_id = 0;
                        ext->next->chunk_off = 0;
                        ext->next->length = left;
                        ext->next->next = NULL;

                        ext = ext->next;
                }
                else
                        ext = ext->next = NULL;
        }

end:
        return ret;
}

static inline int row3_allocate_for_extents(lsv_volume_proto_t *lsv_info, row3_extents_t *extents, uint32_t ext_count, int *update_meta)
{
        int ret = 0;
        uint32_t size = 0;

        for(int i=0;i<ext_count;i++) {
                if(extents[i].chunk_id == 0) {
                        ret = row3_allocate_to_extent(lsv_info, &extents[i]);
                        if(unlikely(ret))
                        {
                                //YASSERT(0);
                                return ret;
                        }
                        *update_meta = 1;
                }
                size += extents[i].length;
        }

        if(!size)
                return 0;

        return ret;
}

static inline void row3_free_associated_extents(row3_extents_t *extents, uint32_t ext_count)
{
        for(int i=0;i<ext_count;i++) {
                if(extents[i].next) {
                        row3_extents_t *ext = extents[i].next;
                        while(ext) {
                                row3_extents_t *p = ext;
                                ext = ext->next;
                                xfree(p);
                        }

                        extents[i].next = NULL;
                }
        }
}

static void row3_volume_read_snap_cb(int type, lsv_bitmap_unit_t * bitmap, int count)
{
        int i;

        for(i=0;i<count;i++) {
                bitmap[i].ref = 1;
        }
}

void row3_data_cow_callback(uint8_t * dataptr)
{
        lsv_bitmap_unit_t *bitmap = (lsv_bitmap_unit_t *)dataptr;
        
        DINFO("row3_data_cow_callback enter\r\n");
        
        for(int i=0;i<LSV_CHUNK_SIZE / sizeof(lsv_bitmap_unit_t);i++) {
                bitmap->ref = 1;
                bitmap ++;
        }
}

void row3_volume_start(lsv_volume_proto_t *lsv_info)
{
        lsv_bitmap_set_read_snap_callback(lsv_info, row3_volume_read_snap_cb);
        lsv_bitmap_set_cow_callback(lsv_info, row3_data_cow_callback);
}
//this is append-only.
/*int row4_paged_write_align(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buffer)
{
        uint32_t ext_count = length / LSV_PAGE_SIZE;
        uint64_t *page_idxs = xmalloc(sizeof(uint64_t) * length / LSV_PAGE_SIZE);
        row3_extents_t *extents = xmalloc(sizeof(row3_extents_t) * length / LSV_PAGE_SIZE);

        assert(page_idxs && extents);

        int ret = lsv_volume_alloc_pages(volume, page_idxs, length / LSV_PAGE_SIZE);
        if (ret) {
                free(page_idxs);
                free(extents);

                return ret;
        }

        ret = row3_pages_to_extents(page_idxs, length / LSV_PAGE_SIZE, extents, &ext_count);
        if (ret) {
                goto err;
        }

        // old bitmap, need to be freed.
        lsv_bitmap_unit_t *bitmap = xmalloc(sizeof(lsv_bitmap_unit_t) * (length / LSV_PAGE_SIZE));
        ret = lsv_bitmap_batch_read(volume, offset, length, bitmap);
        if (ret) {
                free(bitmap);

                goto err;
        }

        uint64_t _offset = offset;

        for (int i = 0; i < ext_count; i++) {
                ret = lsv_data_write_pages(volume, extents[i].offset / LSV_PAGE_SIZE, extents[i].length / LSV_PAGE_SIZE, buffer + _offset - offset);

                if (!ret) {
                        lsv_bitmap_unit_t *write_bitmap = xmalloc(sizeof(lsv_bitmap_unit_t) * (extents[i].length / LSV_PAGE_SIZE));

                        for (int j = 0; j < extents[i].length / LSV_PAGE_SIZE; j++) {
                                write_bitmap[j].page_idx = extents[i].offset / LSV_PAGE_SIZE + j;
                        }

                        ret = lsv_bitmap_batch_write(volume, _offset, extents[i].length, write_bitmap);
                        free(write_bitmap);
                }

                if (ret) {
                        free(bitmap);

                        goto err;
                }

                _offset += extents[i].length;
        }

        for (int i = 0; i < length / LSV_PAGE_SIZE; i++) {
                if (bitmap[i].page_idx) {
                        // todo. optimize.
                        int count = 1;
                        for (int j = i + 1; j < length / LSV_PAGE_SIZE; j++) {
                                if (bitmap[j].page_idx == bitmap[j - 1].page_idx + 1)
                                        count++;
                                else
                                        break;
                        }

                        lsv_volume_free_pages(volume, bitmap[i].page_idx, count);
                        i += count - 1;
                }
        }

        free(bitmap);

err:
        free(page_idxs);
        free(extents);

        return ret;
}*/

typedef struct
{
        co_cond_t *cond;
        lsv_volume_proto_t *lsv_info;
        uint64_t vol_id;        //real volume id.
        uint32_t chunk_id;
        uint32_t chunk_off;
        int     length;
        int     write;
        int     *task_count;
        int     ret;
        void    *buffer;
} row3_io_param_t;

void row3_io_callback(void *arg)
{
        row3_io_param_t *param = (row3_io_param_t *)arg;

        SCHEDULE_LEASE_SET();

        if(param->write)
                param->ret = lsv_bitmap_write_chunk(param->lsv_info, param->chunk_id, param->chunk_off, param->length,param->buffer);
        else if(unlikely(param->vol_id))
                param->ret = volume_proto_remote_read_chunk(param->lsv_info, param->vol_id, param->chunk_id, param->chunk_off, param->length,
                                                     param->buffer);
        else
                param->ret = lsv_bitmap_read_chunk(param->lsv_info, 0, param->chunk_id, param->chunk_off, param->length,param->buffer);

        LSV_DBUG("row3 io vol_id= %ju, type=%s, return, %d\r\n", param->vol_id, param->write?"write":"read", param->ret);

        (*param->task_count)--;

        DBUG("row3_paged_write_align run %d\r\n", (*param->task_count));

        if(!(*param->task_count))
                co_cond_broadcast(param->cond, 0);

        //plock_unlock(&param->lock);
}

static inline int row3_io_async(row3_io_param_t *param)
{
        DBUG("row3_paged_write_align submit %d:%d\r\n", param->chunk_id, param->chunk_off);

        schedule_task_new("row3_io_async", row3_io_callback, param, -1);

        return 0;
}

int row3_volume_io_lock(lsv_volume_proto_t *lsv_info, int level, uint64_t offset, uint32_t length)
{
        if(level == ROW3_LOCK_LEVEL_SPAN_RLOCK)
            return row3_span_rdlock(lsv_info->volume_proto, offset, length);
        else if(level == ROW3_LOCK_LEVEL_SPAN_WLOCK)
            return row3_span_wrlock(lsv_info->volume_proto, offset, length);
        else if(level == ROW3_LOCK_LEVEL_BITMAP_COW_LOCK)
            return row3_bitmap_span_wrlock1(lsv_info->volume_proto, offset, length);

        return 0;
}

int row3_volume_io_unlock(lsv_volume_proto_t *lsv_info, int level, uint64_t offset, uint32_t length)
{
        if(level == ROW3_LOCK_LEVEL_SPAN_RLOCK)
            row3_span_unlock(lsv_info->volume_proto, offset, length);
        else if(level == ROW3_LOCK_LEVEL_SPAN_WLOCK)
            row3_span_unlock(lsv_info->volume_proto, offset, length);
        else if(level == ROW3_LOCK_LEVEL_BITMAP_COW_LOCK)
            row3_bitmap_span_unlock1(lsv_info->volume_proto, offset, length);

        return 0;
}

static inline int row3_lock_probe(lsv_volume_proto_t *lsv_info, uint64_t offset, uint32_t length, row3_lock_switch_t *lock_switch)
{
        int ret, i;
        uint64_t new_offset;
        uint32_t new_length;

        if(!lock_switch->lock_set) { /*not probed*/
                lock_switch->lock_set = 1; /*set only once*/
                range_align(LSV_PAGE_SIZE, offset, length, &new_offset, &new_length);

                if(lsv_bitmap_will_cow(lsv_info, new_offset, new_length))
                {
                        lock_switch->lock_level = ROW3_LOCK_LEVEL_BITMAP_COW_LOCK;
                        return 0;
                }

                lsv_bitmap_unit_t *bitmap;
                ring_pool_pop(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);
        
                ret = lsv_bitmap_batch_read(lsv_info, new_offset, new_length, bitmap);
                if (unlikely(ret))
                {
                        ring_pool_push(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);

                        DERROR("read bitmap error, err=%d\r\n", ret);

                        return ret;
                }

                for(i =0;i<new_length / LSV_PAGE_SIZE;i++)
                {
                        if(bitmap->chunk_id == 0 || bitmap->ref)
                        {
                                ring_pool_push(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);

                                lock_switch->user_data = NULL;
                                lock_switch->lock_level = ROW3_LOCK_LEVEL_SPAN_WLOCK;

                                return 0; 
                        }
                }

                if(offset != new_offset || length != new_length)
                {
                        /*can keep bitmap*/
                        lock_switch->lock_level = ROW3_LOCK_LEVEL_SPAN_WLOCK;
                }
                
                lock_switch->user_data = bitmap;
                
                return 0; /*do not free*/
        }

        return 0;
}

#define LSV_BITMAP_DBG 1

int row3_paged_write_align(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buffer, row3_lock_switch_t *lock_switch)
{
        int ret = 0;
        int update_meta = 0;
        uint32_t ext_count = length / LSV_PAGE_SIZE;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        lsv_bitmap_unit_t *bitmap;
        row3_extents_t *extents;
        int has_bitmap = 0;

#if LSV_LOW_VOL_TEST2
        return row_test_volume_write_align(volume_proto, offset, length, buffer);
#endif

        assert(sizeof(lsv_bitmap_unit_t) * (length / LSV_PAGE_SIZE) < 4096);
        assert(sizeof(row3_extents_t) * (length / LSV_PAGE_SIZE) < 8192);

        ret = ring_pool_pop(lsv_info->row3_rings.extent_ring, (void **)&extents, 1);
        assert(ret == 0);

#if !ROW3_LOCK_OPTIMIZE
        ret = lsv_bitmap_do_cow(lsv_info, offset, length);
        if(unlikely(ret))
                GOTO(err_ret, ret);
#else
        if(lock_switch->userdata)
                has_bitmap = 1;
#endif

        if(unlikely(!has_bitmap)) {
                ret = ring_pool_pop(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);
                assert(ret == 0);

                ret = lsv_bitmap_batch_read(lsv_info, offset, length, bitmap);
                if (unlikely(ret)) {
                        GOTO(err_free1, ret);
                }
        }
        else {
                bitmap = lock_switch->user_data;
        }

        ret = row3_bitmaps_to_extents(bitmap, length / LSV_PAGE_SIZE, extents, &ext_count, 1);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = lsv_wrlock(&lsv_info->info_lock);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = row3_allocate_for_extents(lsv_info, extents, ext_count, &update_meta);
        if (unlikely(ret)) {
                lsv_unlock(&lsv_info->info_lock);
                GOTO(err_release, ret);
        }

        lsv_unlock(&lsv_info->info_lock);

        row3_io_param_t * param;// = xmalloc(sizeof(row3_io_param_t) * (length / LSV_PAGE_SIZE));

        assert(sizeof(row3_io_param_t) * (length / LSV_PAGE_SIZE) < 32768);

        ret = ring_pool_pop(lsv_info->row3_rings.async_ring, (void **)&param, 1);
        assert(ret == 0);

        int task_count = 0;
        int check_count = 0;

        co_cond_t cond;
        co_cond_init(&cond);

        //DBUG("row3_paged_write_align begin\r\n");
        uint64_t _offset = offset;
        
        if(ext_count > 1 || extents[0].next) {
                for (int i = 0; i < ext_count; i++) {
                        row3_extents_t *extent = &extents[i];

                        while(extent) {
                                if (extent->chunk_id) {// skip empty data.
                                        /*ret = lsv_bitmap_write_chunk(lsv_info, extent->chunk_id, extent->chunk_off, extent->length,(lsv_s8_t *)buffer + _offset - offset);
                                        if(unlikely(ret))
                                                goto err;
                                                */
                                        param[task_count].lsv_info = lsv_info;
                                        param[task_count].vol_id = 0;  //does not sense to writting.
                                        param[task_count].chunk_id = extent->chunk_id;
                                        param[task_count].chunk_off = extent->chunk_off;
                                        param[task_count].length = extent->length;
                                        param[task_count].buffer = (lsv_s8_t *)buffer + _offset - offset;
                                        param[task_count].write = 1;
                                        param[task_count].task_count = &task_count;
                                        param[task_count].cond = &cond;

                                        task_count ++;
                                        check_count ++;
                                }
                                else
                                        assert(0);      //impossible.

                                _offset += extent->length;
                                extent = extent->next;
                        }
                }

                for(int i=0;i<check_count;i++)
                        row3_io_async(&param[i]);

                while (task_count > 0) {
                        co_cond_wait2(&cond, "row3_paged_write_align");
                }
        }
        else {
                param[0].ret = lsv_bitmap_write_chunk(lsv_info, extents[0].chunk_id, extents[0].chunk_off, extents[0].length, buffer);
                check_count = 1;
        }

        DBUG("row3_paged_write_align end\r\n");

        row3_extents_to_bitmaps(extents, ext_count, bitmap, (length / LSV_PAGE_SIZE));
        
        for(int i=0;i<check_count;i++) {
                if (unlikely(param[i].ret)) {
                        ret = param[i].ret;
                        DERROR("%d. row3 write err! ret %d\n", i, ret);
                        row3_free_associated_extents(extents, ext_count);
                        GOTO(err_free2, ret);
                }
        }

        row3_free_associated_extents(extents, ext_count);
        ring_pool_push(lsv_info->row3_rings.async_ring, (void **)&param, 1);

        if (update_meta) {
                ret = lsv_bitmap_batch_write(lsv_info, offset, length, bitmap);
#if LSV_BITMAP_DBG
                lsv_bitmap_unit_t *u = bitmap;
                for (int i = 0; i < length; i += LSV_PAGE_SIZE) {
                        YASSERT(u->chunk_id);
                        LSV_DBUG("bitmap write, off:%lu, %d. id %u paged_off %u\n", offset, i, u->chunk_id, u->chunk_page_off);
                        u++;
                }
#endif
        }

        ring_pool_push(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);
        ring_pool_push(lsv_info->row3_rings.extent_ring, (void **)&extents, 1);
        //xfree(bitmap); xfree(extents);
        return ret;
err_free2:
        //xfree(param);
        ring_pool_push(lsv_info->row3_rings.async_ring, (void **)&param, 1);
err_release:
        //lsv_bitmap_batch_release(lsv_info, offset, length, bitmap);
err_free1:
        //xfree(bitmap); xfree(extents);
        ring_pool_push(lsv_info->row3_rings.bitmap_ring, (void **)&bitmap, 1);
        ring_pool_push(lsv_info->row3_rings.extent_ring, (void **)&extents, 1);
err_ret:
        return ret;
}


int row3_paged_read_align(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buffer)
{
        int ret;
        uint32_t ext_count = length / LSV_PAGE_SIZE;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        uint32_t vvol_id = ((lsv_bitmap_context_t *)(lsv_info->bitmap_context))->bitmap_header->vvol_id;

#if LSV_LOW_VOL_TEST2
        return row_test_volume_read_align(volume_proto, offset, length, buffer);
#endif
        lsv_bitmap_unit_t *bitmap = xmalloc(sizeof(lsv_bitmap_unit_t) * (length / LSV_PAGE_SIZE));
        row3_extents_t *extents = xmalloc(sizeof(row3_extents_t) * (length / LSV_PAGE_SIZE));

        ret = lsv_bitmap_batch_read(lsv_info, offset, length, bitmap);
        if (unlikely(ret)) {
                GOTO(err_free1, ret);
        }

#if LSV_BITMAP_DBG
        lsv_bitmap_unit_t *u = bitmap;
        for(int i = 0; i < length; i += LSV_PAGE_SIZE) {
                LSV_DBUG("bitmap read, off:%lu, %d. id %u paged_off %u\n", offset, i, u->chunk_id, u->chunk_page_off);
                u++;
        }
#endif

        ret = row3_bitmaps_to_extents(bitmap, length / LSV_PAGE_SIZE, extents, &ext_count, 0);
        if (unlikely(ret)) {
                GOTO(err_release, ret);
        }

        row3_io_param_t * param = xmalloc(sizeof(row3_io_param_t) * (length / LSV_PAGE_SIZE));
        int task_count = 0;
        int check_count = 0;

        co_cond_t cond;
        co_cond_init(&cond);

        uint64_t _offset = offset;
        for (int i = 0; i < ext_count; i++) {
                if (extents[i].chunk_id) {// skip empty data.

                        if(extents[i].vvol_id == vvol_id)
                                param[task_count].vol_id = 0;
                        else
                                lsv_bitmap_vvol_to_vol(lsv_info, extents[i].vvol_id, &param[task_count].vol_id);

                        param[task_count].lsv_info = lsv_info;

                        param[task_count].chunk_id = extents[i].chunk_id;
                        param[task_count].chunk_off = extents[i].chunk_off;
                        param[task_count].length = extents[i].length;
                        param[task_count].buffer = (lsv_s8_t *)buffer + _offset - offset;
                        param[task_count].write = 0;
                        param[task_count].task_count = &task_count;
                        param[task_count].cond = &cond;

                        task_count ++;
                        check_count ++;
                }

                _offset += extents[i].length;
        }

        for(int i=0;i<check_count;i++)
                row3_io_async(&param[i]);

        while (task_count > 0) {
                co_cond_wait2(&cond, "row3_paged_read_align");
        }

        for(int i=0;i<check_count;i++) {
                if (unlikely(param[i].ret)) {
                        ret = param[i].ret;
                        DERROR("%d. row3 write err! ret %d\n", i, ret);
                        GOTO(err_free2, ret);
                }
        }

        xfree(param);

#ifdef CACHE_NEW
        lsv_bitmap_batch_release(lsv_info, offset, length, bitmap);
#else
#endif

        xfree(bitmap); 
        xfree(extents);
        return 0;
err_free2:
        xfree(param);
err_release:
//        lsv_bitmap_batch_release(lsv_info, offset, length, bitmap);
err_free1:
        xfree(bitmap); xfree(extents);
//err_ret:
        return ret;
}

void row3_mem_free_handler(void *ptr)
{
        xfree(ptr);
}

int row3_volume_proto_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret = 0;
        uint64_t off_new = io->offset;
        uint32_t len_new = io->size;
        uint8_t *_buffer;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        #if LSV_LOW_VOL_TEST
        return row_test_volume_read(volume_proto, io, buf);
        #endif

        assert(lsv_bitmap_is_ready(lsv_info));

        ret = lsv_rdlock(&lsv_info->io_lock);// lock for snapshots, resizing etc.
        assert(!ret);

        if ((io->offset % LSV_PAGE_SIZE) == 0 && (io->size % LSV_PAGE_SIZE) == 0) {
                off_new = io->offset;
                len_new = io->size;
        } else {
                range_align(LSV_PAGE_SIZE, io->offset, io->size, &off_new, &len_new);
        }

        _buffer = xmalloc(len_new);

        ret = row3_span_rdlock(volume_proto, off_new, len_new);
        if(unlikely(ret))
                goto err_span_lock;

#if !ROW3_LOCK_OPTIMIZE
        ret = row3_bitmap_span_wrlock(volume_proto, off_new, len_new);
        if(ret)
                goto err_bmp_lock;
#endif
        ret = row3_paged_read_align(volume_proto, off_new, len_new, _buffer);
        if (unlikely(ret)) {
                xfree(_buffer);
                GOTO(un_lock, ret);
        }

#if !LSV_USE_HUGE_PAGE
        mbuffer_copy(buf, (const char *)_buffer + io->offset - off_new, io->size);
        xfree(_buffer);
#else
        mbuffer_attach_with(buf, (char *)_buffer + io->offset - off_new, io->size, _buffer, row3_mem_free_handler);
#endif

        YASSERT(buf->len == io->size);

un_lock:
        row3_bitmap_span_unlock(volume_proto, off_new, len_new);
err_bmp_lock:
        row3_span_unlock(volume_proto, off_new, len_new);
err_span_lock:
        lsv_unlock(&lsv_info->io_lock);

        return ret;
}

int row3_volume_proto_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf)
{
        int ret = 0;
        uint64_t off_new = io->offset;
        uint32_t len_new = io->size;
        uint8_t *_buffer;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        row3_lock_switch_t lock_switch = {0};
#if LSV_LOW_VOL_TEST
        return row_test_volume_write(volume_proto, io, buf);
#endif

        assert(lsv_bitmap_is_ready(lsv_info));

        //ret = lsv_wrlock(&lsv_info->io_lock);
        ret = lsv_rdlock(&lsv_info->io_lock);// lock for snapshots, resizing etc.
        assert(!ret);

         if (likely((io->offset % LSV_PAGE_SIZE) == 0 && (io->size % LSV_PAGE_SIZE) == 0)) {
                off_new = io->offset;
                len_new = io->size;
         } else
                range_align(LSV_PAGE_SIZE, io->offset, io->size, &off_new, &len_new);

#if ROW3_LOCK_OPTIMIZE

        lock_switch.lock_level = volume->driver.wlock_level;
        lock_probe.lock_set = 0;
        lock_probe.user_data = NULL;

relock:
        row3_volume_io_lock(lsv_info, lock_probe.lock_level, offset, length);

        old_level = lock_probe.lock_level;
        ret = row3_lock_probe(lsv_info, offset, length, &lock_probe);
        if(ret)
        {
                row3_volume_io_unlock(lsv_info, old_level, offset, length);
                return ret;
        }

        if(old_level != lock_probe.lock_level)
        {
                row3_volume_io_unlock(lsv_info, old_level, offset, length);

                goto relock;
        }

        if(lock_probe.lock_level == LOCK_LEVEL_BITMAP_COW_LOCK)
        {
                ret = row3_bitmap_do_cow(lsv_info, offset, length);
                if(ret)
                {
                        row3_volume_io_unlock(lsv_info, old_level, offset, length);
                        return ret;
                }
        }

#else
        ret = row3_span_wrlock(volume_proto, off_new, len_new);
        if(unlikely(ret))
                goto err_span_lock;

        if (unlikely(lsv_bitmap_will_cow(lsv_info, off_new, len_new)))
                ret = row3_bitmap_span_wrlock(volume_proto, off_new, len_new);
        else
                ret = row3_bitmap_span_rdlock(volume_proto, off_new, len_new);
        if(unlikely(ret))
                goto err_bmp_lock;
#endif

        if (likely((io->offset % LSV_PAGE_SIZE) == 0 && (io->size % LSV_PAGE_SIZE) == 0)) {
                if (unlikely(mbuffer_segcount((buffer_t *)buf) > 1)) {
                        _buffer = xmalloc(len_new);

                        mbuffer_get(buf, _buffer, io->size);

                        ret = row3_paged_write_align(volume_proto, off_new, len_new, _buffer, &lock_switch);

                        xfree(_buffer);
                } else {
                        struct list_head *pos, *n;
                        seg_t *seg;

                        list_for_each_safe(pos, n, &buf->list) {
                                seg = (seg_t *)pos;
                                ret = row3_paged_write_align(volume_proto, off_new, len_new, seg->handler.ptr, &lock_switch);

                                break;
                        }
                }
        } else {
                _buffer = xmalloc(len_new);

                if (len_new > 16 * 1024) {
                        if ((io->offset % LSV_PAGE_SIZE) != 0) {
                                ret = row3_paged_read_align(volume_proto, off_new, LSV_PAGE_SIZE, _buffer);
                                if (unlikely(ret)) {
                                        xfree(_buffer);
                                        goto un_lock;
                                }
                        }

                        if (((io->offset + io->size) % LSV_PAGE_SIZE) != 0) {
                                ret = row3_paged_read_align(volume_proto, off_new + len_new - LSV_PAGE_SIZE, LSV_PAGE_SIZE,
                                                                     _buffer + len_new - LSV_PAGE_SIZE);
                                if (unlikely(ret)) {
                                        xfree(_buffer);
                                        goto un_lock;
                                }
                        }
                } else
                        ret = row3_paged_read_align(volume_proto, off_new, len_new, _buffer);

                if (unlikely(ret)) {
                        xfree(_buffer);

                        goto un_lock;
                }

                mbuffer_get(buf, _buffer + io->offset - off_new, io->size);

                ret = row3_paged_write_align(volume_proto, off_new, len_new, _buffer, &lock_switch);

                xfree(_buffer);
        }

un_lock:
        row3_bitmap_span_unlock(volume_proto, off_new, len_new);
err_bmp_lock:
        row3_span_unlock(volume_proto, off_new, len_new);
err_span_lock:
        lsv_unlock(&lsv_info->io_lock);

        return ret;
}
